[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
AWS re:Invent 2022 で発表された、AWS IoT Core での MQTT v5 対応に反応して、色々入門しています。
今回は、フォーマット識別要素を使用して、タイプの違う Payload をデコードする実装を試してみました。
なお、現時点(2022/11/30)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。
参考:https://dev.classmethod.jp/articles/aws-iot-core-mqtt-v5-sdk-for-python
2 Payload Format Indicator/Content Type
MQTT v 3.1 では、Payload の内容をパースする要領は、全てクライアントの実装に任せられていました。 この為、Payload のフォーマットが一定でない場合は、その判別実装が必要でした。
MQTT v5 では、拡張されたプロパティに 2 つのフォーマット識別要素が追加されており、これを利用することで、判別のコストを大きく下げることができますま。
Property | Description | Input type | Command |
---|---|---|---|
Payload Format Indicator | Payload が、UTF-8 かどうかのブール値 | Byte | PUBLISH, CONNECT |
Content Type | コンテンツを説明する UTF-8 文字列 | UTF-8 string | PUBLISH, CONNECT |
(1) Payload Format Indicator
Payload の内容が、UTF-8 文字列か否かを示します。文字列を扱う Subscriber にとっては、有意義な識別要素となります。 このプロパティは、1 バイトしか消費されません。バイト列の場合、0、UTF-8 文字列の場合、1 をセットします。
(2) Content Type
Content Type は、ペイロードの中身のタイプを具体的に表現する識別要素です。
一般的には MIME タイプで表現されそうに見えますが、ここで指定されるものは、あくまで Publisher と Subscriber の間の取り決めです。
3 実装例
以下が、実装したコードです。
Connect の後、Payload Format Indicator及び、Content Typeを指定して、プレーンなテキストと JSON を Publish しています。 また、同時に Subscribe で自らが送信したデータを受信し、識別要素を頼りにデコードして表示しています。
import ssl import os import time import json import paho.mqtt.client as mqtt from paho.mqtt.properties import Properties from paho.mqtt.packettypes import PacketTypes endpoint = "xxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com" port = 8883 topic = "sensors/device1" dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir), "certfile": "{}/certificates/client-cert.pem".format(dir), "keyfile": "{}/certificates/private-key.pem".format(dir), } def on_connect(client, user_data, flags, reason_code, properties=None): print("on_connect") client.subscribe(topic, qos=1) def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None): print("on_subscribe") def on_publish(client,userdata, result,properties=None): print("on_publish") def on_message(client, userdata, message): data = '' payload_format_indicator = message.properties.PayloadFormatIndicator content_type = message.properties.ContentType if(payload_format_indicator): payload = str(message.payload.decode("utf-8")) if(content_type == 'text/plain'): data = payload else: if(content_type == 'application/json'): data = json.loads(message.payload) print('on_message payload:{} PayloadFormatIndicator:{} ContentType:{}'.format(data, payload_format_indicator, content_type)) def main(): client = mqtt.Client("client_id", protocol = mqtt.MQTTv5) client.tls_set(certs["cafile"], certfile=certs["certfile"], keyfile=certs["keyfile"], cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) client.on_connect = on_connect client.on_subscribe = on_subscribe client.on_publish = on_publish client.on_message = on_message client.connect(endpoint, port, properties = None) client.loop_start() time.sleep(1) properties = Properties(PacketTypes.PUBLISH) # 1回目のpublish UTF-8 の文字列を送信する properties.PayloadFormatIndicator = 1 properties.ContentType = "text/plain" payload = "hello world" client.publish(topic, payload, qos=1, properties=properties) time.sleep(1) # 2回目のpublish JSONを送信する properties.PayloadFormatIndicator = 0 properties.ContentType = "application/json" payload = json.dumps({'message':'hello'}) client.publish(topic, payload, qos=1, properties=properties) time.sleep(1) if __name__ == "__main__": main()
実行した結果です。適切にエンコードできていることが確認できます。
% python3 index.py on_connect on_subscribe on_publish on_message payload:hello world PayloadFormatIndicator:1 ContentType:text/plain on_publish on_message payload:{'message': 'hello'} PayloadFormatIndicator:0 ContentType:application/json
4 最後に
今回は、フォーマット識別要素(Payload Format Indicator及び、Content Type)を使用して、タイプの違う Payload をデコードする実装を試してみました。
Content-Type は、AWS IoT Code に依存しているものではないので、例えば、仕様で3種類の JSON 形式を扱う場合、その3種類を識別できる Content Type を定義し、クライアントとサーバで共有すれば良いことになります。この拡張も、Subscriber 側の実装コストを下げるものとして有効に使えそうです。